package cn.pink.core;

import cn.hutool.core.util.StrUtil;
import cn.pink.core.config.IronConfig;
import cn.pink.core.interfaces.IThreadCase;
import cn.pink.core.stat.StatRPC;
import cn.pink.core.support.*;
import cn.pink.core.support.function.*;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * Port
 * @Author: pink
 * @Date: 2022/5/18 20:51
 */
public class Port implements IThreadCase {
    private Node node;
    protected String portId;

    /** 默认异步请求都是30秒过期 */
    public static final int DEFAULT_TIMEOUT = 30 * 1000;

    /** 当前线程开始时间(毫秒) */
    private long timeCurrent = 0;
    /** 发出的最后一个请求ID号 */
    private long sendLastCallId = 0;

    /** 当前线程的Port实例 */
    final static ThreadLocal<Port> portCurrent = new ThreadLocal<>();

    /** 下属服务 */
    private final Map<Object, Service> services = new ConcurrentHashMap<>();

    /** 正在处理中的Call请求 利用LinkedList来模拟栈 */
    private final LinkedList<Call> callHandling = new LinkedList<>();
    /** 接收到待处理的请求 */
    private final ConcurrentLinkedQueue<Call> calls = new ConcurrentLinkedQueue<>();
    /** 接收到的请求返回值 */
    private final ConcurrentLinkedQueue<Call> callResults = new ConcurrentLinkedQueue<>();
    /** 本次心跳需要处理的请求 */
    private final List<Call> pulseCalls = new ArrayList<>();
    /** 本次心跳需要处理的请求返回值 */
    private final List<Call> pulseCallResults = new ArrayList<>();
    /** 待处理执行队列 */
    private final ConcurrentLinkedQueue<PortTask> tasks = new ConcurrentLinkedQueue<>();

    /** 请求返回值监听 */
    private final Map<Long, CallResultBase> callResultListener = new HashMap<>();
    /** 请求返回值监听超时计时器 */
    private final TickTimer callResultTimeoutCleanTimer = new TickTimer(1000);

    /** 心跳时间秒表 */
    private final StepWatch pulseStepWatch = new StepWatch(Log.effect.isDebugEnabled());

    /** 任务队列调度器 */
    protected Scheduler scheduler;

    public Port(String portId) {
        this.portId = portId;
    }

    /**
     * 启动port
     * 中心节点开启, 中心节点单实例单线程, 不会有太大性能损耗, 中心服开启最为合适. 除了中心节点其他节点没必要开启定时器, 耗性能.
     * 有每秒和没分的tick就够用了, 假如真的需要复杂定时可以在中心节点设置定时器通过rpc通知其他节点的方式
     * @param node 关联node
     * @param openScheduler 是否开启定时器, verticle多实例部署定时器会报定时器已存在错, 跟踪报错源码SchedulerRepository.java bind()方法 verticle部署几个实例就执行几遍, 怀疑跟多线程多实例有关
     */
    public void startup(Node node, boolean openScheduler) {
        this.node = node;
        this.node.port = this;

        // 初始化quartz相关环境
        if(openScheduler) {
            try {
                this.scheduler = new StdSchedulerFactory().getScheduler();
                this.scheduler.start();
            } catch (Exception e) {
                Log.error.error(ExceptionUtils.getStackTrace(e));
                throw new SysException(e);
            }
        }

        //日志
//        Log.system.info("启动Port={}", this);
    }

    public String getId() {
        return portId;
    }

    public String getNodeId() {
        return node.getId();
    }

    /**
     * 获取当前线程的Port实例
     */
    @SuppressWarnings("unchecked")
    public static <T extends Port> T getCurrent() {
        return (T) portCurrent.get();
    }

    /**
     * 获取服务
     */
    @SuppressWarnings("unchecked")
    public <T extends Service> T getService(Object id) {
        return (T) services.get(id);
    }

    @SuppressWarnings("unchecked")
    public <T extends Service> Collection<T> getServices() {
        return (Collection<T>) services.values();
    }
    /**
     * 添加新服务
     */
    public void addService(Service service) {
        services.put(service.getId(), service);
    }

    /**
     * 删除服务
     */
    public void delService(Object id) {
        Service serv = services.get(id);
        try {
            serv.deleteSchedulerJobsByGroup(id.toString());
        } catch (SchedulerException e) {
            Log.error.error("删除service时，清空该service的scheduler出错", e);
            /*LogCore.core.error("删除service时，清空该service的scheduler出错", e);*/
        }

        services.remove(id);
    }

    /**
     * 删除服务
     */
    public void delServiceBySafe(Object id) {
        // 避免由于删除服务，造成心跳内后续操作报错，所以将实际删除工作延后至下一心跳
        this.addTask(new PortTask(id) {
            public void execute(Port port) {
                Service serv = services.get(param.get());
                if (serv == null) return ;

                //将添加在service上的定时任务清除掉
                try {
                    serv.deleteSchedulerJobsByGroup(param.get().toString());
                } catch (SchedulerException e) {
                    Log.error.error("删除service时，清空该service的scheduler出错", e);
                    /*LogCore.core.error("删除service时，清空该service的scheduler出错", e);*/
                }

                port.removeService(param.get());
            }
        });
    }

    public void removeService(Object servId) {
        services.remove(servId);
    }

    /**
     * 当前线程开始时间(毫秒)
     */
    public long getTimeCurrent() {
        return timeCurrent;
    }

    /**
     * 获取系统时间
     */
    public static long getTime() {
        return getCurrent().getTimeCurrent();
    }

    @Override
    public void caseStart() {
        portCurrent.set(this);
    }

    @Override
    public void caseStop() {

    }

    @Override
    public void caseRunOnce(int delay) {
        pulse(delay);
    }

    /**
     * 每帧心跳
     */
    private void pulse(int delay) {
        //记录下心跳开始时的时间戳 供之后的操作来统一时间
        timeCurrent = System.currentTimeMillis();

        //确认本心跳要执行的call及result
        pulseCallAffirm();

        //记录一些日志调试信息
        int countCall = pulseCalls.size();
        int countResult = pulseCallResults.size();
        if (countCall > 0) {
            Log.effect.debug("callCount={}, resultCount={}",	countCall, countResult);
        }

        //计时开始
        StepWatch sw = pulseStepWatch;
        sw.step();

        /* 执行本心跳的任务 */
        //Call请求
        pulseCalls();
        sw.logTime("call");

        //处理返回值
        pulseCallResults();
        sw.logTime("result");

        //清理超时返回值监控
        pulseCallResultsTimeoutClean();
        sw.logTime("resultClean");

        //调用下属服务
        pulseServices();
        sw.logTime("service");

        //执行等待任务队列
        pulseQueue();
        sw.logTime("queue");

        // TODO 发送实体属性修改变化

        //记录下时间比较长的心跳操作
        long timeFinish = System.currentTimeMillis();
        if (timeFinish - timeCurrent >= IronConfig.OB_PULSE_VPT) {
            Log.effect.warn("本次心跳操作总时间较长，达到了{}毫秒。"
                            + "portName={}, callCount={}, resultCount={}, countQueue={}, time={}", timeFinish - timeCurrent, this.node.getId() + "." + this.portId,
                    countCall, countResult, tasks.size(), sw.getLog(true));
        }
    }

    /**
     * 确认本心跳要执行的call及result
     */
    private void pulseCallAffirm() {
        // 本心跳要执行的call
        Call call;
        while ((call = calls.poll()) != null) {
            pulseCalls.add(call);
        }

        // 本心跳要执行的callResult
        Call callResult;
        while ((callResult = callResults.poll()) != null) {
            pulseCallResults.add(callResult);
        }
    }

    /**
     * 心跳中处理请求
     */
    private void pulseCalls() {
        while (!pulseCalls.isEmpty()) {
            // 因为下面的try中需要与出栈入栈配合 所以这句就不放在try中了
            Call call = pulseCalls.remove(0);

            dispatchCall(call);
        }
    }

    /**
     * 心跳中处理请求返回值
     */
    private void pulseCallResults() {
        while (!pulseCallResults.isEmpty()) {
            try {
                Call call = pulseCallResults.remove(0);
                // 处理返回值
                CallResultBase listener = callResultListener.remove(call.id);
                if (listener != null) {
                    listener.onResult(call);
                }
            } catch (Throwable e) {
                // 不做任何处理 仅仅抛出异常
                // 避免因为一个任务的出错 造成后续的任务无法继续执行 需要等到下一个心跳
                Log.error.error("", e);
                /*log.error("", e);*/
            }
        }
    }

    /**
     * 调用下属服务的心跳操作 默认启动本操作 如果子Port不想自动调用可以覆盖本函数
     */
    protected void pulseServices() {
        for (Service o : services.values()) {
            try {
                o.pulse();
            } catch (Throwable e) {
                // 不做任何处理 仅仅抛出异常
                // 避免因为一个任务的出错 造成后续的任务无法继续执行 需要等到下一个心跳
                Log.error.error("", e);
                /*log.error("", e);*/
            }
        }
    }

    /**
     * 清理超时的返回值监听
     */
    private void pulseCallResultsTimeoutClean() {
        // 间隔一段时间清理一次
        if (!callResultTimeoutCleanTimer.isPeriod(timeCurrent)) {
            return;
        }

        // 超时的返回值
        List<CallResultBase> timeoutResult = new ArrayList<>();
        for (CallResultBase r : callResultListener.values()) {
            if (!r.isTimeout()) {
                continue;
            }

            timeoutResult.add(r);
        }

        // 删除超时的监听
        for (Iterator<CallResultBase> iter = timeoutResult.iterator(); iter.hasNext();) {
            CallResultBase r = iter.next();
            // 删除监听
            callResultListener.remove(r.getCallId());
            try {
                // 执行清理
                r.onTimeout();
            } catch (Throwable e) {
                Log.error.error("callResult超时清理异常，r={}", r, e);
                /*log.error("callResult超时清理异常，r={}", r.toString(), e);*/
            }
        }
    }

    /**
     * 心跳中执行队列任务
     */
    private void pulseQueue() {
        PortTask msg;
        while ((msg = tasks.poll()) != null) {
            try {
                msg.execute(this);
            } catch (Throwable e) {
                // 不做任何处理 仅仅抛出异常
                // 避免因为一个任务的出错 造成后续的任务无法继续执行 需要等到下一个心跳
                Log.error.error("", e);
                /*log.error("", e);*/
            }
        }
    }

    /**
     * 调用call
     */
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public void dispatchCall(Call call) {
        try {
            // 压入栈 记录正在处理的Call请求
            callHandling.addLast(call);

            // 执行Call请求
            Service serv = getService(call.to.servId);
            if (serv == null) {
                Log.system.warn("执行Call队列时无法找到处理服务：call={}", call);
            } else {
                Object f = serv.getMethodFunction(call.methodKey);
                Object[] m = call.methodParam;

                //开启统计
                long start = IronConfig.STAT_ENABLE ? System.nanoTime() : 0;

                switch (call.methodParam.length) {
                    case 0: ((IronFunction0) f).apply(); break;
                    case 1: ((IronFunction1) f).apply(m[0]); break;
                    case 2: ((IronFunction2) f).apply(m[0], m[1]); break;
                    case 3: ((IronFunction3) f).apply(m[0], m[1], m[2]); break;
                    case 4: ((IronFunction4) f).apply(m[0], m[1], m[2], m[3]); break;
                    case 5: ((IronFunction5) f).apply(m[0], m[1], m[2], m[3], m[4]); break;
                    case 6: ((IronFunction6) f).apply(m[0], m[1], m[2], m[3], m[4], m[5]); break;
                    case 7: ((IronFunction7) f).apply(m[0], m[1], m[2], m[3], m[4], m[5], m[6]); break;
                    case 8: ((IronFunction8) f).apply(m[0], m[1], m[2], m[3], m[4], m[5], m[6], m[7]); break;
                    case 9: ((IronFunction9) f).apply(m[0], m[1], m[2], m[3], m[4], m[5], m[6], m[7], m[8]); break;
                    default: break;
                }

                if (start > 0) {
                    StatRPC.rpc(StrUtil.format("{}${}", serv.getClass().toString(), call.methodKey), System.nanoTime()-start);
                }
            }
        } catch (Throwable e) {
            // 不做任何处理 仅仅记录异常
            // 避免因为一个任务的出错 造成后续的任务无法继续执行 需要等到下一个心跳
            Log.error.error("执行Call队列时发生错误: call={}", call, e);
        } finally {
            // 请求处理完毕 记录出栈
            callHandling.removeLast();
        }
    }

    /**
     * 发起一个远程调用RPC请求
     */
    public void call(CallPoint toPoint, int methodKey, Object[] methodParam) {
        //拼装请求
        Call call = new Call();
        call.id = applyCallId();

        call.type = Call.TYPE_RPC;

        call.methodKey = methodKey;
        call.methodParam = methodParam;

        call.fromNodeId = node.getId();
        call.fromPortId = portId;

        call.to = toPoint;

        sendCall(call);
    }

    /**
     * 生成一个call
     */
    public Call makeCall(CallPoint toPoint, int methodKey, Object[] methodParam) {
        Call call = new Call();
        call.id = applyCallId();

        call.type = Call.TYPE_RPC;

        call.methodKey = methodKey;
        call.methodParam = methodParam;

        call.fromNodeId = node.getId();
        call.fromPortId = portId;

        call.to = toPoint;

        return call;
    }

    /**
     * 发起一个远程调用RPC请求
     */
    private void sendCall(Call call) {
        node.sendCall(call);
    }

    /**
     * 申请一个新的请求ID
     */
    private long applyCallId() {
        return ++sendLastCallId;
    }

    /**
     * 添加待处理请求
     */
    public void addCall(Call call) {
        calls.add(call);
    }

    /**
     * 添加待处理请求返回值
     */
    public void addCallResult(Call call) {
        callResults.add(call);
    }

    /**
     * 添加延后队列任务
     */
    public void addTask(PortTask task) {
        tasks.add(task);
    }

    /**
     * 监听请求返回值
     */
    public void listenResult(IronFunction2<Param, Param> method, Object... context) {
        listenResult(method, new Param(context));
    }

    public void listenResult(IronFunction3<Boolean, Param, Param> methodTimeout, Object... context) {
        listenResult(methodTimeout, new Param(context));
    }

    /**
     * 监听请求返回值
     */
    public void listenResult(IronFunction2<Param, Param> method, Param context) {
        //加入一个异步监听
        CallResultBase crb = new CallResultAsync(sendLastCallId, Port.DEFAULT_TIMEOUT, method, context);
        callResultListener.put(sendLastCallId, crb);
    }

    public void listenResult(IronFunction3<Boolean, Param, Param> method, Param context) {
        //加入一个异步监听
        CallResultBase crb = new CallResultAsync(sendLastCallId, Port.DEFAULT_TIMEOUT, method, context);
        callResultListener.put(sendLastCallId, crb);
    }

    /**
     * 发送请求返回值
     */
    public void returns(Object... values) {
        Call call = callHandling.getLast();
        returns(call.createCallReturn(), new Param(values));
    }

    /**
     * 发送请求返回值
     */
    public void returns(CallReturn callReturn, Object... values) {
        returns(callReturn, new Param(values));
    }

    /**
     * 发送请求返回值
     */
    public void returns(CallReturn callReturn, Param values) {
        Call call = new Call();
        call.id = callReturn.id;
        call.type = Call.TYPE_RPC_RETURN;
        call.methodParam = new Object[0];

        call.fromNodeId = node.getId();
        call.fromPortId = portId;

        call.to = new CallPoint();
        call.to.nodeId = callReturn.nodeId;
        call.to.portId = callReturn.portId;

        call.returns = values;
        sendCall(call);
    }

    @Override
    public String toString() {
        return new ToStringBuilder(this)
                .append("nodeId", getNodeId())
                .append("portId", getId())
                .toString();
    }
}
